前面我們打造了功能完整的 AI 助手系統,但系統上線後你會面臨靈魂三問:
這篇要建立 生產級可觀測性體系,讓你對系統瞭如指掌,第一時間發現問題、定位問題、解決問題。
目標:從「系統跑起來了」到「系統跑得好、看得清、省得多」
flowchart TB
subgraph "用戶層"
USER[用戶請求]
WEBAPP[Web 應用]
end
subgraph "應用層"
CHATSERVICE[Chat Service]
MEMSERVICE[Memory Service]
TOOLEXEC[Tool Executor]
end
subgraph "AI 層"
GEMINI[Gemini API]
DISCOVERY[Discovery Engine]
end
subgraph "可觀測性三支柱"
direction LR
LOGS[📝 結構化日誌<br/>Cloud Logging]
METRICS[📊 指標監控<br/>Cloud Monitoring]
TRACES[🔍 分散式追蹤<br/>Cloud Trace]
end
subgraph "分析與行動"
DASHBOARD[📈 即時儀表板]
ALERT[🚨 智慧告警]
BILLING[💰 真實成本<br/>Billing Export]
OPTIMIZE[⚡ 自動優化]
end
USER --> WEBAPP --> CHATSERVICE
CHATSERVICE --> GEMINI
CHATSERVICE --> DISCOVERY
CHATSERVICE --> TOOLEXEC
CHATSERVICE -.日誌.-> LOGS
CHATSERVICE -.指標.-> METRICS
CHATSERVICE -.追蹤.-> TRACES
GEMINI -.API 用量.-> METRICS
DISCOVERY -.檢索效果.-> METRICS
LOGS --> DASHBOARD
METRICS --> DASHBOARD
TRACES --> DASHBOARD
METRICS --> ALERT
LOGS --> ALERT
BILLING --> ANALYSIS
ANALYSIS --> OPTIMIZE
OPTIMIZE -.調整.-> CHATSERVICE
支柱 | 作用 | 實現技術 | 關鍵指標 |
---|---|---|---|
📝 Logs | What happened? | Cloud Logging + jsonPayload | 錯誤率、異常堆疊 |
📊 Metrics | How much? | Cloud Monitoring + DELTA/GAUGE | 延遲、QPS、Token 用量 |
🔍 Traces | Where's bottleneck? | Cloud Trace + OpenTelemetry | 端到端延遲、依賴關係 |
# shared/logging_config.py
import logging
import traceback
from datetime import datetime
from typing import Any, Dict, Optional
from contextvars import ContextVar
import hashlib
from google.cloud import logging as cloud_logging
from google.cloud.logging_v2.handlers import StructuredLogHandler
# 使用 ContextVar 追蹤請求上下文
request_id_var: ContextVar[str] = ContextVar('request_id', default='')
user_id_var: ContextVar[str] = ContextVar('user_id', default='')
chat_id_var: ContextVar[str] = ContextVar('chat_id', default='')
trace_id_var: ContextVar[str] = ContextVar('trace_id', default='')
class StructuredLogger:
"""生產級結構化日誌"""
def __init__(self, name: str, project_id: str):
self.logger = logging.getLogger(name)
self.project_id = project_id
self._configure_cloud_logging()
def _configure_cloud_logging(self):
"""配置 GCP Cloud Logging"""
try:
# 使用 StructuredLogHandler 確保輸出到 jsonPayload
handler = StructuredLogHandler()
self.logger.addHandler(handler)
self.logger.setLevel(logging.INFO)
except Exception:
# 降級到標準輸出
handler = logging.StreamHandler()
handler.setFormatter(logging.Formatter(
'%(asctime)s - %(name)s - %(levelname)s - %(message)s'
))
self.logger.addHandler(handler)
self.logger.setLevel(logging.INFO)
def _build_log_entry(
self,
message: str,
level: str,
extra: Optional[Dict[str, Any]] = None,
error: Optional[Exception] = None
) -> Dict[str, Any]:
"""建立結構化日誌條目"""
log_entry = {
"timestamp": datetime.utcnow().isoformat() + "Z",
"severity": level,
"message": message,
"service": "ai-assistant",
"request_id": request_id_var.get(),
"user_id": user_id_var.get(),
"chat_id": chat_id_var.get(),
"logging.googleapis.com/trace": f"projects/{self.project_id}/traces/{trace_id_var.get()}",
**(extra or {})
}
if error:
log_entry["error"] = {
"type": type(error).__name__,
"message": str(error),
"stacktrace": traceback.format_exc()
}
return {k: v for k, v in log_entry.items() if v}
def _mask_sensitive_data(self, text: str) -> Dict[str, Any]:
"""遮蔽敏感資料"""
return {
"content_hash": hashlib.sha256(text.encode()).hexdigest()[:16],
"content_length": len(text),
}
def info(self, message: str, **kwargs):
"""資訊日誌"""
entry = self._build_log_entry(message, "INFO", kwargs)
self.logger.info(entry)
def warning(self, message: str, **kwargs):
"""警告日誌"""
entry = self._build_log_entry(message, "WARNING", kwargs)
self.logger.warning(entry)
def error(self, message: str, error: Optional[Exception] = None, **kwargs):
"""錯誤日誌"""
entry = self._build_log_entry(message, "ERROR", kwargs, error)
self.logger.error(entry)
def critical(self, message: str, error: Optional[Exception] = None, **kwargs):
"""嚴重錯誤日誌"""
entry = self._build_log_entry(message, "CRITICAL", kwargs, error)
self.logger.critical(entry)
def user_action(self, action: str, **kwargs):
"""用戶行為日誌"""
safe_kwargs = {k: v for k, v in kwargs.items()
if k not in ['message_text', 'user_email', 'phone']}
self.info(
f"User action: {action}",
log_type="user_action",
action=action,
**safe_kwargs
)
def ai_interaction(
self,
interaction_type: str,
model: str,
tokens_used: int = 0,
latency_ms: int = 0,
**kwargs
):
"""AI 互動日誌"""
self.info(
f"AI interaction: {interaction_type}",
log_type="ai_interaction",
interaction_type=interaction_type,
model=model,
tokens_used=tokens_used,
latency_ms=latency_ms,
**kwargs
)
def tool_execution(
self,
tool_name: str,
status: str,
execution_time_ms: int = 0,
**kwargs
):
"""工具執行日誌"""
safe_kwargs = kwargs.copy()
if 'parameters' in safe_kwargs:
safe_kwargs['parameters_hash'] = hashlib.sha256(
str(safe_kwargs['parameters']).encode()
).hexdigest()[:16]
safe_kwargs.pop('parameters')
self.info(
f"Tool execution: {tool_name} - {status}",
log_type="tool_execution",
tool_name=tool_name,
status=status,
execution_time_ms=execution_time_ms,
**safe_kwargs
)
def cost_tracking(
self,
service: str,
operation: str,
estimated_cost_usd: float,
**kwargs
):
"""成本追蹤日誌(僅為估算)"""
self.info(
f"Cost tracking: {service} - {operation}",
log_type="cost_tracking",
service=service,
operation=operation,
estimated_cost_usd=estimated_cost_usd,
is_estimate=True,
**kwargs
)
_logger: Optional[StructuredLogger] = None
def get_logger(project_id: str) -> StructuredLogger:
"""獲取全局日誌實例"""
global _logger
if _logger is None:
_logger = StructuredLogger("ai-assistant", project_id)
return _logger
def set_request_context(
request_id: str,
user_id: str = "",
chat_id: str = "",
trace_id: str = ""
):
"""設定請求上下文"""
request_id_var.set(request_id)
user_id_var.set(user_id)
chat_id_var.set(chat_id)
trace_id_var.set(trace_id)
def clear_request_context():
"""清除請求上下文"""
request_id_var.set("")
user_id_var.set("")
chat_id_var.set("")
trace_id_var.set("")
# services/chat/app/middleware.py
from fastapi import Request, Response
from starlette.middleware.base import BaseHTTPMiddleware
import time
import uuid
import jwt
from shared.logging_config import get_logger, set_request_context, clear_request_context
class ObservabilityMiddleware(BaseHTTPMiddleware):
"""可觀測性中間件"""
def __init__(self, app, project_id: str):
super().__init__(app)
self.logger = get_logger(project_id)
self.project_id = project_id
def _extract_user_id(self, request: Request) -> str:
"""從 JWT 或 OIDC header 提取 user_id"""
# 1. API Gateway / Cloud Run OIDC header
if "X-Apigateway-Api-Userinfo" in request.headers:
try:
import base64
import json
userinfo = json.loads(
base64.b64decode(request.headers["X-Apigateway-Api-Userinfo"])
)
return userinfo.get("sub", "anonymous")
except:
pass
# 2. Authorization Bearer token
auth_header = request.headers.get("Authorization", "")
if auth_header.startswith("Bearer "):
try:
token = auth_header.split(" ")[1]
decoded = jwt.decode(
token,
options={"verify_signature": False}
)
return decoded.get("sub") or decoded.get("user_id", "anonymous")
except:
pass
# 3. 降級方案
return request.headers.get("X-User-ID", "anonymous")
async def dispatch(self, request: Request, call_next):
request_id = request.headers.get("X-Request-ID") or str(uuid.uuid4())
trace_id = request.headers.get("X-Cloud-Trace-Context", "").split("/")[0] or str(uuid.uuid4())
user_id = self._extract_user_id(request)
set_request_context(request_id, user_id, trace_id=trace_id)
start_time = time.time()
self.logger.info(
"Request started",
method=request.method,
path=request.url.path,
client_ip=request.client.host,
user_agent=request.headers.get("user-agent", "")[:200]
)
try:
response = await call_next(request)
processing_time_ms = int((time.time() - start_time) * 1000)
self.logger.info(
"Request completed",
method=request.method,
path=request.url.path,
status_code=response.status_code,
processing_time_ms=processing_time_ms
)
response.headers["X-Request-ID"] = request_id
response.headers["X-Processing-Time-Ms"] = str(processing_time_ms)
return response
except Exception as e:
processing_time_ms = int((time.time() - start_time) * 1000)
self.logger.error(
"Request failed",
error=e,
method=request.method,
path=request.url.path,
processing_time_ms=processing_time_ms
)
raise
finally:
clear_request_context()
# shared/metrics_client.py
from google.cloud import monitoring_v3
from google.api import metric_pb2 as ga_metric
from google.api import label_pb2 as ga_label
import time
import os
from typing import Dict, Any, Optional
class MetricsClient:
"""自定義指標客戶端"""
def __init__(self, project_id: str):
self.project_id = project_id
self.project_name = f"projects/{project_id}"
self.client = monitoring_v3.MetricServiceClient()
self.resource_type, self.resource_labels = self._detect_environment()
self._ensure_metric_descriptors()
def _detect_environment(self) -> tuple:
"""檢測執行環境"""
if os.getenv("K_SERVICE"):
return "cloud_run_revision", {
"project_id": self.project_id,
"service_name": os.getenv("K_SERVICE", "unknown"),
"revision_name": os.getenv("K_REVISION", "unknown"),
"location": os.getenv("CLOUD_RUN_LOCATION", "asia-east1")
}
elif os.getenv("KUBERNETES_SERVICE_HOST"):
return "k8s_container", {
"project_id": self.project_id,
"location": os.getenv("GKE_LOCATION", "us-central1"),
"cluster_name": os.getenv("GKE_CLUSTER", "default"),
"namespace_name": os.getenv("K8S_NAMESPACE", "default"),
"pod_name": os.getenv("HOSTNAME", "unknown"),
"container_name": "app"
}
else:
return "global", {"project_id": self.project_id}
def _ensure_metric_descriptors(self):
"""確保指標描述符存在"""
descriptors = [
{
"type": "custom.googleapis.com/ai_assistant/interaction_latency",
"metric_kind": ga_metric.MetricDescriptor.MetricKind.GAUGE,
"value_type": ga_metric.MetricDescriptor.ValueType.DOUBLE,
"description": "AI interaction latency in milliseconds",
"display_name": "AI Interaction Latency",
"unit": "ms",
"labels": [
ga_label.LabelDescriptor(
key="model",
value_type=ga_label.LabelDescriptor.ValueType.STRING
),
ga_label.LabelDescriptor(
key="interaction_type",
value_type=ga_label.LabelDescriptor.ValueType.STRING
)
]
},
{
"type": "custom.googleapis.com/ai_assistant/tokens_used",
"metric_kind": ga_metric.MetricDescriptor.MetricKind.DELTA,
"value_type": ga_metric.MetricDescriptor.ValueType.INT64,
"description": "Tokens used per interaction",
"display_name": "Tokens Used",
"unit": "1",
"labels": [
ga_label.LabelDescriptor(
key="model",
value_type=ga_label.LabelDescriptor.ValueType.STRING
),
ga_label.LabelDescriptor(
key="token_type",
value_type=ga_label.LabelDescriptor.ValueType.STRING
)
]
},
{
"type": "custom.googleapis.com/ai_assistant/tool_execution_count",
"metric_kind": ga_metric.MetricDescriptor.MetricKind.DELTA,
"value_type": ga_metric.MetricDescriptor.ValueType.INT64,
"description": "Tool execution count",
"display_name": "Tool Execution Count",
"unit": "1",
"labels": [
ga_label.LabelDescriptor(
key="tool_name",
value_type=ga_label.LabelDescriptor.ValueType.STRING
),
ga_label.LabelDescriptor(
key="status",
value_type=ga_label.LabelDescriptor.ValueType.STRING
)
]
},
{
"type": "custom.googleapis.com/ai_assistant/estimated_cost",
"metric_kind": ga_metric.MetricDescriptor.MetricKind.DELTA,
"value_type": ga_metric.MetricDescriptor.ValueType.DOUBLE,
"description": "Estimated cost per operation (USD)",
"display_name": "Estimated Cost",
"unit": "USD",
"labels": [
ga_label.LabelDescriptor(
key="service",
value_type=ga_label.LabelDescriptor.ValueType.STRING
),
ga_label.LabelDescriptor(
key="operation",
value_type=ga_label.LabelDescriptor.ValueType.STRING
)
]
},
{
"type": "custom.googleapis.com/ai_assistant/user_satisfaction",
"metric_kind": ga_metric.MetricDescriptor.MetricKind.GAUGE,
"value_type": ga_metric.MetricDescriptor.ValueType.DOUBLE,
"description": "User satisfaction score (1-5)",
"display_name": "User Satisfaction",
"unit": "1",
"labels": [
ga_label.LabelDescriptor(
key="user_id",
value_type=ga_label.LabelDescriptor.ValueType.STRING
)
]
}
]
for descriptor_config in descriptors:
try:
descriptor = ga_metric.MetricDescriptor(
type=descriptor_config["type"],
metric_kind=descriptor_config["metric_kind"],
value_type=descriptor_config["value_type"],
description=descriptor_config["description"],
display_name=descriptor_config["display_name"],
unit=descriptor_config.get("unit", "1"),
labels=descriptor_config["labels"]
)
self.client.create_metric_descriptor(
name=self.project_name,
metric_descriptor=descriptor
)
except Exception:
pass
def record_interaction_latency(
self,
latency_ms: float,
model: str,
interaction_type: str
):
"""記錄 AI 互動延遲"""
self._write_time_series(
"custom.googleapis.com/ai_assistant/interaction_latency",
latency_ms,
{"model": model, "interaction_type": interaction_type},
metric_kind="GAUGE"
)
def record_tokens_used(self, tokens: int, model: str, token_type: str):
"""記錄 Token 使用量"""
self._write_time_series(
"custom.googleapis.com/ai_assistant/tokens_used",
tokens,
{"model": model, "token_type": token_type},
metric_kind="DELTA"
)
def record_tool_execution(self, tool_name: str, status: str):
"""記錄工具執行"""
self._write_time_series(
"custom.googleapis.com/ai_assistant/tool_execution_count",
1,
{"tool_name": tool_name, "status": status},
metric_kind="DELTA"
)
def record_estimated_cost(self, cost_usd: float, service: str, operation: str):
"""記錄估計成本"""
self._write_time_series(
"custom.googleapis.com/ai_assistant/estimated_cost",
cost_usd,
{"service": service, "operation": operation},
metric_kind="DELTA"
)
def record_user_satisfaction(self, score: float, user_id: str):
"""記錄用戶滿意度"""
self._write_time_series(
"custom.googleapis.com/ai_assistant/user_satisfaction",
score,
{"user_id": user_id},
metric_kind="GAUGE"
)
def _write_time_series(
self,
metric_type: str,
value: float,
labels: Dict[str, str],
metric_kind: str = "GAUGE"
):
"""寫入時間序列數據"""
try:
series = monitoring_v3.TimeSeries()
series.metric.type = metric_type
series.resource.type = self.resource_type
for key, val in self.resource_labels.items():
series.resource.labels[key] = val
for key, val in labels.items():
series.metric.labels[key] = val
now = time.time()
seconds = int(now)
nanos = int((now - seconds) * 10 ** 9)
interval = monitoring_v3.TimeInterval()
interval.end_time.seconds = seconds
interval.end_time.nanos = nanos
if metric_kind == "DELTA":
interval.start_time.seconds = seconds - 60
interval.start_time.nanos = nanos
point = monitoring_v3.Point()
point.interval.CopyFrom(interval)
if isinstance(value, float):
point.value.double_value = value
else:
point.value.int64_value = int(value)
series.points = [point]
self.client.create_time_series(
name=self.project_name,
time_series=[series]
)
except Exception as e:
print(f"⚠️ Failed to write metric: {e}")
_metrics_client: Optional[MetricsClient] = None
def get_metrics_client(project_id: str) -> MetricsClient:
"""獲取全局指標客戶端"""
global _metrics_client
if _metrics_client is None:
_metrics_client = MetricsClient(project_id)
return _metrics_client
# shared/tracing_config.py
from opentelemetry import trace
from opentelemetry.exporter.cloud_trace import CloudTraceSpanExporter
from opentelemetry.sdk.trace import TracerProvider
from opentelemetry.sdk.trace.export import BatchSpanProcessor
from opentelemetry.sdk.resources import Resource
from opentelemetry.sdk.trace.sampling import ParentBased, TraceIdRatioBased
from opentelemetry.instrumentation.fastapi import FastAPIInstrumentor
from opentelemetry.instrumentation.httpx import HTTPXClientInstrumentor
from typing import Optional
import os
import functools
class TracingConfig:
"""分散式追蹤配置"""
def __init__(
self,
project_id: str,
service_name: str = "ai-assistant",
sample_rate: float = 0.1
):
self.project_id = project_id
self.service_name = service_name
self.sample_rate = sample_rate
self._setup_tracing()
def _setup_tracing(self):
"""設定 Cloud Trace"""
resource = Resource.create({
"service.name": self.service_name,
"service.version": "2.0.0"
})
sampler = ParentBased(root=TraceIdRatioBased(self.sample_rate))
tracer_provider = TracerProvider(resource=resource, sampler=sampler)
cloud_trace_exporter = CloudTraceSpanExporter(project_id=self.project_id)
tracer_provider.add_span_processor(
BatchSpanProcessor(cloud_trace_exporter)
)
trace.set_tracer_provider(tracer_provider)
def instrument_fastapi(self, app):
"""為 FastAPI 添加追蹤"""
FastAPIInstrumentor.instrument_app(app)
def instrument_httpx(self):
"""為 HTTPX 添加追蹤"""
HTTPXClientInstrumentor().instrument()
_tracer: Optional[trace.Tracer] = None
def get_tracer(project_id: str) -> trace.Tracer:
"""獲取全局追蹤器"""
global _tracer
if _tracer is None:
sample_rate = float(os.getenv("TRACE_SAMPLE_RATE", "0.1"))
config = TracingConfig(project_id, sample_rate=sample_rate)
_tracer = trace.get_tracer(__name__)
return _tracer
def trace_function(name: str):
"""函數追蹤裝飾器"""
def decorator(func):
@functools.wraps(func)
async def wrapper(*args, **kwargs):
tracer = trace.get_tracer(__name__)
with tracer.start_as_current_span(name):
return await func(*args, **kwargs)
return wrapper
return decorator
#!/bin/bash
# scripts/create-logs-based-metrics.sh
PROJECT_ID="your-project-id"
echo "📊 建立 Logs-based Metrics..."
# 錯誤計數
gcloud logging metrics create error_count \
--project=$PROJECT_ID \
--description="Error log count" \
--log-filter='severity>=ERROR AND resource.type="cloud_run_revision" AND resource.labels.service_name="chat-service-enhanced"' \
--value-extractor='EXTRACT(1)' \
--metric-kind=DELTA \
--value-type=INT64
echo "✅ Logs-based metrics 建立完成"
# monitoring/alert-policies.tf
resource "google_monitoring_alert_policy" "high_latency" {
display_name = "AI Assistant - High Latency"
combiner = "OR"
conditions {
display_name = "Latency > 3000ms"
condition_threshold {
filter = "metric.type=\"custom.googleapis.com/ai_assistant/interaction_latency\" resource.type=\"cloud_run_revision\""
comparison = "COMPARISON_GT"
threshold_value = 3000
duration = "300s"
aggregations {
alignment_period = "60s"
per_series_aligner = "ALIGN_PERCENTILE_95"
}
}
}
notification_channels = [google_monitoring_notification_channel.email.name]
alert_strategy {
auto_close = "604800s"
}
}
resource "google_monitoring_alert_policy" "high_error_rate" {
display_name = "AI Assistant - High Error Rate"
combiner = "OR"
conditions {
display_name = "Error rate > 5%"
condition_threshold {
filter = "metric.type=\"logging.googleapis.com/user/error_count\" resource.type=\"cloud_run_revision\""
comparison = "COMPARISON_GT"
threshold_value = 0.05
duration = "180s"
aggregations {
alignment_period = "60s"
per_series_aligner = "ALIGN_RATE"
}
}
}
notification_channels = [google_monitoring_notification_channel.email.name]
}
resource "google_monitoring_notification_channel" "email" {
display_name = "AI Assistant Alerts"
type = "email"
labels = {
email_address = "alerts@example.com"
}
}
{
"displayName": "AI Assistant 生產監控",
"mosaicLayout": {
"columns": 12,
"tiles": [
{
"width": 6,
"height": 4,
"widget": {
"title": "🚀 請求延遲 (P50, P95, P99)",
"xyChart": {
"dataSets": [
{
"timeSeriesQuery": {
"timeSeriesFilter": {
"filter": "metric.type=\"custom.googleapis.com/ai_assistant/interaction_latency\" resource.type=\"cloud_run_revision\"",
"aggregation": {
"alignmentPeriod": "60s",
"perSeriesAligner": "ALIGN_PERCENTILE_50"
}
}
},
"plotType": "LINE"
},
{
"timeSeriesQuery": {
"timeSeriesFilter": {
"filter": "metric.type=\"custom.googleapis.com/ai_assistant/interaction_latency\" resource.type=\"cloud_run_revision\"",
"aggregation": {
"alignmentPeriod": "60s",
"perSeriesAligner": "ALIGN_PERCENTILE_95"
}
}
},
"plotType": "LINE"
},
{
"timeSeriesQuery": {
"timeSeriesFilter": {
"filter": "metric.type=\"custom.googleapis.com/ai_assistant/interaction_latency\" resource.type=\"cloud_run_revision\"",
"aggregation": {
"alignmentPeriod": "60s",
"perSeriesAligner": "ALIGN_PERCENTILE_99"
}
}
},
"plotType": "LINE"
}
]
}
}
},
{
"xPos": 6,
"width": 6,
"height": 4,
"widget": {
"title": "💰 每小時估計成本",
"xyChart": {
"dataSets": [
{
"timeSeriesQuery": {
"timeSeriesFilter": {
"filter": "metric.type=\"custom.googleapis.com/ai_assistant/estimated_cost\" resource.type=\"cloud_run_revision\"",
"aggregation": {
"alignmentPeriod": "3600s",
"perSeriesAligner": "ALIGN_RATE",
"crossSeriesReducer": "REDUCE_SUM",
"groupByFields": ["metric.label.service"]
}
}
},
"plotType": "STACKED_AREA"
}
]
}
}
},
{
"yPos": 4,
"width": 4,
"height": 4,
"widget": {
"title": "🔧 工具執行成功率",
"scorecard": {
"timeSeriesQuery": {
"timeSeriesFilter": {
"filter": "metric.type=\"custom.googleapis.com/ai_assistant/tool_execution_count\" metric.label.status=\"success\" resource.type=\"cloud_run_revision\"",
"aggregation": {
"alignmentPeriod": "3600s",
"perSeriesAligner": "ALIGN_RATE"
}
}
}
}
}
},
{
"xPos": 4,
"yPos": 4,
"width": 4,
"height": 4,
"widget": {
"title": "🪙 Token 使用量",
"scorecard": {
"timeSeriesQuery": {
"timeSeriesFilter": {
"filter": "metric.type=\"custom.googleapis.com/ai_assistant/tokens_used\" resource.type=\"cloud_run_revision\"",
"aggregation": {
"alignmentPeriod": "3600s",
"perSeriesAligner": "ALIGN_RATE"
}
}
}
}
}
},
{
"xPos": 8,
"yPos": 4,
"width": 4,
"height": 4,
"widget": {
"title": "⭐ 用戶滿意度",
"scorecard": {
"timeSeriesQuery": {
"timeSeriesFilter": {
"filter": "metric.type=\"custom.googleapis.com/ai_assistant/user_satisfaction\" resource.type=\"cloud_run_revision\"",
"aggregation": {
"alignmentPeriod": "3600s",
"perSeriesAligner": "ALIGN_MEAN"
}
}
},
"gaugeView": {
"lowerBound": 1.0,
"upperBound": 5.0
}
}
}
}
]
}
}
# shared/cost_analyzer.py
from google.cloud import bigquery
from datetime import datetime, timedelta
from typing import Dict, List, Any
class CostAnalyzer:
"""成本分析器"""
PRICING_REFERENCE = {
"gemini-1.5-pro": {
"input": 1.25 / 1_000_000,
"output": 5.00 / 1_000_000,
},
"gemini-1.5-flash": {
"input": 0.075 / 1_000_000,
"output": 0.30 / 1_000_000,
},
}
def __init__(self, project_id: str, dataset_id: str = "logging"):
self.project_id = project_id
self.bq_client = bigquery.Client(project=project_id)
self.log_table = f"{project_id}.{dataset_id}.stdout"
def analyze_daily_cost_estimate(self, date: str = None) -> Dict[str, Any]:
"""分析每日成本估算(來自應用層日誌)"""
if date is None:
date = datetime.now().strftime("%Y-%m-%d")
query = f"""
SELECT
jsonPayload.service as service,
jsonPayload.operation as operation,
SUM(CAST(jsonPayload.estimated_cost_usd AS FLOAT64)) as total_cost,
COUNT(*) as operation_count
FROM
`{self.log_table}`
WHERE
DATE(timestamp) = '{date}'
AND jsonPayload.log_type = 'cost_tracking'
GROUP BY
service, operation
ORDER BY
total_cost DESC
"""
try:
results = self.bq_client.query(query).to_dataframe()
return {
"date": date,
"total_cost_estimate": float(results["total_cost"].sum()) if len(results) > 0 else 0.0,
"breakdown": results.to_dict("records"),
"note": "這是估算值,真實成本請查 Cloud Billing Export"
}
except Exception as e:
return {
"date": date,
"error": str(e),
"total_cost_estimate": 0.0
}
def get_real_cost_from_billing(self, start_date: str, end_date: str) -> Dict[str, Any]:
"""從 Cloud Billing Export 獲取真實成本"""
# 需要先設定 Billing Export 到 BigQuery
billing_table = f"{self.project_id}.billing_export.gcp_billing_export_v1_XXXXXX"
query = f"""
SELECT
service.description as service,
SUM(cost) as total_cost,
currency
FROM
`{billing_table}`
WHERE
DATE(_PARTITIONTIME) BETWEEN '{start_date}' AND '{end_date}'
AND project.id = '{self.project_id}'
GROUP BY
service, currency
ORDER BY
total_cost DESC
"""
try:
results = self.bq_client.query(query).to_dataframe()
return {
"period": f"{start_date} to {end_date}",
"total_cost": float(results["total_cost"].sum()),
"breakdown": results.to_dict("records"),
"source": "Cloud Billing Export (真實成本)"
}
except Exception as e:
return {
"error": str(e),
"note": "請先設定 Cloud Billing Export"
}
def get_optimization_suggestions(self) -> List[Dict[str, Any]]:
"""獲取成本優化建議"""
suggestions = [
{
"type": "model_selection",
"priority": "high",
"title": "智慧模型選擇",
"description": "對簡單查詢使用 Gemini 1.5 Flash,可節省 94% 成本",
"implementation": "在 chat handler 中根據查詢複雜度動態選擇模型"
},
{
"type": "caching",
"priority": "medium",
"title": "啟用回應緩存",
"description": "對重複查詢使用 Redis/Memorystore 緩存",
"estimated_savings": "20-30%"
},
{
"type": "prompt_optimization",
"priority": "medium",
"title": "優化 Prompt 長度",
"description": "縮短 System Instruction,減少不必要的上下文",
"estimated_savings": "10-15%"
},
{
"type": "batch_processing",
"priority": "low",
"title": "批次處理",
"description": "對非即時任務使用批次 API",
"estimated_savings": "50% on batch workloads"
}
]
return suggestions
#!/bin/bash
# scripts/setup-full-observability.sh
set -e
PROJECT_ID="${1:-your-project-id}"
REGION="${2:-asia-east1}"
EMAIL="${3:-alerts@example.com}"
echo "🚀 部署完整可觀測性系統..."
echo "項目: $PROJECT_ID"
echo "區域: $REGION"
echo "告警郵箱: $EMAIL"
echo ""
# 1. 啟用 API
echo "📡 啟用必要的 API..."
gcloud services enable \
logging.googleapis.com \
monitoring.googleapis.com \
cloudtrace.googleapis.com \
bigquery.googleapis.com \
--project=$PROJECT_ID
# 2. 建立 BigQuery 數據集
echo "📊 建立 BigQuery 數據集..."
bq mk --dataset \
--location=US \
--description="AI Assistant 日誌數據" \
$PROJECT_ID:logging
# 3. 建立日誌 Sink
echo "📝 建立日誌 Sink..."
gcloud logging sinks create ai-assistant-logs \
bigquery.googleapis.com/projects/$PROJECT_ID/datasets/logging \
--log-filter='resource.type="cloud_run_revision" AND resource.labels.service_name="chat-service-enhanced"' \
--project=$PROJECT_ID
# 4. 授予 Sink 權限
echo "🔐 設定 Sink 權限..."
SINK_SA=$(gcloud logging sinks describe ai-assistant-logs \
--project=$PROJECT_ID \
--format='value(writerIdentity)')
gcloud projects add-iam-policy-binding $PROJECT_ID \
--member="$SINK_SA" \
--role="roles/bigquery.dataEditor"
# 5. 建立 Logs-based Metrics
echo "📊 建立 Logs-based Metrics..."
gcloud logging metrics create error_count \
--project=$PROJECT_ID \
--description="Error log count" \
--log-filter='severity>=ERROR AND resource.type="cloud_run_revision" AND resource.labels.service_name="chat-service-enhanced"' \
--value-extractor='EXTRACT(1)' \
--metric-kind=DELTA \
--value-type=INT64
# 6. 建立通知渠道
echo "📧 建立告警通知渠道..."
CHANNEL_ID=$(gcloud alpha monitoring channels create \
--display-name="AI Assistant Alerts" \
--type=email \
--channel-labels=email_address=$EMAIL \
--project=$PROJECT_ID \
--format="value(name)")
echo "✅ 通知渠道: $CHANNEL_ID"
# 7. 部署監控儀表板
echo "📊 部署監控儀表板..."
sed "s/PROJECT_ID/$PROJECT_ID/g" monitoring/dashboard.json > /tmp/dashboard-final.json
gcloud monitoring dashboards create \
--config-from-file=/tmp/dashboard-final.json \
--project=$PROJECT_ID
# 8. 設定 Cloud Trace 採樣率
echo "🔍 配置 Cloud Trace..."
echo "TRACE_SAMPLE_RATE=0.1" >> .env.production
# 9. 輸出結果
echo ""
echo "✅ 可觀測性系統部署完成!"
echo ""
echo "📊 儀表板: https://console.cloud.google.com/monitoring/dashboards?project=$PROJECT_ID"
echo "🔍 日誌: https://console.cloud.google.com/logs?project=$PROJECT_ID"
echo "📈 追蹤: https://console.cloud.google.com/traces?project=$PROJECT_ID"
echo "💰 帳單: https://console.cloud.google.com/billing?project=$PROJECT_ID"
echo ""
echo "📝 下一步:"
echo "1. 設定 Cloud Billing Export 以追蹤真實成本"
echo "2. 根據實際流量調整告警閾值"
echo "3. 定期檢視成本優化建議"
echo ""
## 日誌系統
- [ ] StructuredLogHandler 已啟用(確認輸出到 jsonPayload)
- [ ] 敏感資訊已遮蔽(user_id hash、不記錄原始 prompt)
- [ ] 日誌保留期限已設定(建議 90 天)
- [ ] BigQuery Sink 已建立並測試寫入
- [ ] Trace ID 已注入日誌(可在 Console 串接 Trace)
## 指標系統
- [ ] 所有 Metric Descriptors 已建立
- [ ] DELTA/GAUGE 正確使用(延遲用 GAUGE,計數用 DELTA)
- [ ] MonitoredResource 根據環境動態設定
- [ ] 指標寫入錯誤不影響主流程
- [ ] P50/P95/P99 延遲追蹤正常
## 追蹤系統
- [ ] Cloud Trace 已啟用
- [ ] 採樣率已設定(生產環境建議 10%)
- [ ] 關鍵路徑已添加 Span
- [ ] FastAPI/HTTPX 自動 instrumentation 已啟用
## 告警系統
- [ ] Logs-based Metrics 已建立
- [ ] 高延遲告警已配置(P95 > 3s)
- [ ] 錯誤率告警已配置(> 5%)
- [ ] 成本超支告警已配置
- [ ] 通知渠道已測試(發送測試告警)
- [ ] On-call 輪值表已建立
## 成本控制
- [ ] 應用層成本估算日誌已啟用
- [ ] Cloud Billing Export 已設定
- [ ] 每週成本報告自動化
- [ ] 成本異常自動告警
- [ ] 成本優化建議定期審查
## 安全與合規
- [ ] 用戶 ID 從 JWT 安全提取(不依賴 header)
- [ ] PII/PHI 已遮蔽或 hash
- [ ] 審計日誌完整記錄
- [ ] 訪問權限最小化
## 性能優化
- [ ] 模型選擇策略已實施(簡單任務用 Flash)
- [ ] 回應緩存機制已啟用
- [ ] Prompt 長度已優化
- [ ] 批次處理已考慮
# 生產環境監控重點
GOLDEN_SIGNALS = {
"latency": {
"p50_target": 1000, # ms
"p95_target": 3000, # ms
"p99_target": 5000, # ms
"metric": "custom.googleapis.com/ai_assistant/interaction_latency"
},
"traffic": {
"target_qps": 100,
"metric": "run.googleapis.com/request_count"
},
"errors": {
"target_rate": 0.01, # 1%
"metric": "logging.googleapis.com/user/error_count"
},
"saturation": {
"cpu_target": 0.8, # 80%
"memory_target": 0.8,
"metric": "run.googleapis.com/container/cpu/utilizations"
}
}
# SLO 定義
SLO = {
"availability": {
"target": 0.999, # 99.9%
"window": "30d"
},
"latency": {
"target": 0.95, # 95% 請求 < 3s
"threshold_ms": 3000,
"window": "30d"
},
"error_rate": {
"target": 0.99, # 99% 請求成功
"window": "30d"
}
}